Skip to content

Add pubsub support to the Glide driver#44

Merged
liorsve merged 3 commits intovalkey-io:mainfrom
liorsve:spring-pubsub
Jan 19, 2026
Merged

Add pubsub support to the Glide driver#44
liorsve merged 3 commits intovalkey-io:mainfrom
liorsve:spring-pubsub

Conversation

@liorsve
Copy link
Copy Markdown
Collaborator

@liorsve liorsve commented Jan 12, 2026

This PR adds pubsub support to the spring-data-valkey glide driver (implements ValkeyGlideSubscriptions and the pubsub API of ValkeyGlideConnectio).
It does so by setting the glide clients with a pubsub callback, which is later routing messages to the pubsub listeners provided by the app.

@liorsve liorsve force-pushed the spring-pubsub branch 5 times, most recently from 54c2f6b to c62cee3 Compare January 15, 2026 09:00
@liorsve liorsve marked this pull request as ready for review January 15, 2026 09:12
@liorsve liorsve changed the title Spring pubsub Add pubsub to the glide driver Jan 15, 2026
// () -> nativeClient.customCommand(new String[]{"SUNSUBSCRIBE"}).get()
// This is defensive - subscriptions were already supposed to be
// cleaned during subscription.close()
() -> nativeClient.customCommand(new String[]{"UNSUBSCRIBE_BLOCKING"}).get(),
Copy link
Copy Markdown

@jduo jduo Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liorsve @ikolomi
This can have a big impact on performance as we're executing 3 extra requests when returning a connection to the pool.

We saw when implement #46 that when running the performance test, the UNWATCH request alone halved the speed of the connector.

Ideally we make these unsubscribe calls conditional. At the very least we should batch them (and probably the UNWATCH if it is needed) together or block on the returned futures all-at-once.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we can remove this entirely if this makes such an impact, as this happens also at subscription.close(). We call it there with theUNSUBSCRIBE_BLOCKINGoption which means that we will wait for server confirmation of unsubscribing before returning the connection to the pull. Removing this

boolean hasListener() {
return messageListener != null;
}
} No newline at end of file
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: No ending newlines in new classes.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// Unsubscribe from SPECIFIC channels we subscribed to, not ALL
if (channelsToNotify.length > 0) {
try {
sendPubsubCommand("UNSUBSCRIBE_BLOCKING");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to connection cleanup, if possible would be good to batch the unsubscribe and punsubscribe calls to avoid multiple network calls.

Copy link
Copy Markdown
Collaborator Author

@liorsve liorsve Jan 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid with the new dynamic pubsub impl in glide it's not really possible - We added a pubsub synchronizer in the core that manages subscriptions and subscription confirmation from the server, which doesn't support sending them as a batch. If we would try to bypass that logic with sending custom commands in a batch, the client would get the unsubscribe push notifications from the server and would think it got unsubscribed wrongfully, and so would try to resubscribe. If this is so impactful in terms of performance we can think of adding this functionality to the glide synchronizer in the future. But now that I removed unsubscribe calls on each connection cleanup, the subscription cleanup only happens on a connection that had an active subscription when closing. And if people are using the Listener Container as the spring docs advise, this container only uses 1 connection for all subscriptions in its entire lifetime, so this cleanup will be called once only when the container is destroyed.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets not hide the exceptions

pom.xml Outdated
<xstream>1.4.21</xstream>
<pool>2.11.1</pool>
<valkey.glide>2.1.1</valkey.glide>
<valkey.glide>2.3.0-rc2</valkey.glide>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Beta this is fine, I assume non-RC will be available by the time we go GA.

This also means we will get the Windows changes (move to JNI) from 2.2.0, which should be tested more thoroughly at some point. Although nothing more to do in this PR.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its ok for betta

Copy link
Copy Markdown
Collaborator

@jeremyprime jeremyprime left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than concern over performance in connection and subscription cleanup, looks good to me.

@jeremyprime jeremyprime linked an issue Jan 16, 2026 that may be closed by this pull request
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

try 2 - callback in config

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

added listener map

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

changed connection to use AbstractSubscription methods

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

enabled glide on interfacr tests, added glude specific tests

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

removed more debug

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

fixed to unifiedClient

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

fix

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

reverted to non arg unsusbcribes, removed stub subscription in config (won't work with old glide)

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

started cleanup for PR

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

added blocking to subscription commands

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>

change to rc version, remove protobuf dependency

Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
Copy link
Copy Markdown
Collaborator

@ikolomi ikolomi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved pending comments resolution


public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, @Nullable ValkeyGlideConnectionFactory factory) {
this(clusterAdapter, factory, Duration.ofMillis(100));
public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just appears this way on github, see + signs beside

public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter,
@Nullable ValkeyGlideConnectionFactory factory,
@Nullable DelegatingPubSubListener pubSubListener,
Duration cacheTimeout) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not as the last param?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter, @Nullable ValkeyGlideConnectionFactory factory, Duration cacheTimeout) {
super(clusterAdapter, factory);
public ValkeyGlideClusterConnection(ClusterGlideClientAdapter clusterAdapter,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just looks like it bc of github

private boolean earlyStartup = true;
private int phase = 0;

private final Map<Object, DelegatingPubSubListener> clientListenerMap = new ConcurrentHashMap<>();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please comment what is the Object and what for

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


protected final UnifiedGlideClient unifiedClient;
protected final @Nullable ValkeyGlideConnectionFactory factory;
protected final @Nullable DelegatingPubSubListener pubSubListener;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment why do we "Delegate" (we are using glide's callback which is supplied on the creation , yada yda)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

/**
* Send a pub/sub command directly to the client using GlideString.
*/
private void sendPubsubCommand(String command, byte[]... args) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

args -> channels

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed


try {
client.customCommand(glideArgs);
} catch (InterruptedException e) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one case is cleaner

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

String channelBase = "test:concurrent:isolation";

ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch allDone = new CountDownLatch(threadCount);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allDone probably not required - use
executorService.shutdown();
executorService.awaitTermination(TIMEOUT_SECONDS, TimeUnit.SECONDS);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

import io.valkey.springframework.data.valkey.connection.lettuce.LettuceConnectionFactory;
import io.valkey.springframework.data.valkey.connection.lettuce.extension.LettuceConnectionFactoryExtension;
import io.valkey.springframework.data.valkey.connection.valkeyglide.ValkeyGlideConnectionFactory;
import io.valkey.springframework.data.valkey.connection.valkeyglide.extension.ValkeyGlideConnectionFactoryExtension;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line?

Copy link
Copy Markdown
Collaborator Author

@liorsve liorsve Jan 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really there just appears this way

return parameters;
}


Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@ikolomi ikolomi added the enhancement New feature or request label Jan 18, 2026
@ikolomi ikolomi added this to the Beta 0.2.0 milestone Jan 18, 2026
Signed-off-by: Lior Sventitzky <liorsve@amazon.com>
@liorsve liorsve changed the title Add pubsub to the glide driver Add pubsub support to the Glide driver Jan 18, 2026
@liorsve liorsve merged commit f4529e4 into valkey-io:main Jan 19, 2026
51 of 52 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Integrate reliable and dynamic PubSub from Valkey-Glide

4 participants